Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10210] [STREAMING] Filter out non-existent blocks before creating BlockRDD #8405

Closed
wants to merge 4 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Aug 25, 2015

When write ahead log is not enabled, a recovered streaming driver still tries to run jobs using pre-failure block ids, and fails as the block do not exists in-memory any more (and cannot be recovered as receiver WAL is not enabled).

This occurs because the driver-side WAL of ReceivedBlockTracker is recovers that past block information, and ReceiveInputDStream creates BlockRDDs even if those blocks do not exist.

The solution in this PR is to filter out block ids that do not exist before creating the BlockRDD. In addition, it adds unit tests to verify other logic in ReceiverInputDStream.

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41495 has finished for PR 8405 at commit 89aaf9b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41497 has finished for PR 8405 at commit 9a81b93.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -116,7 +116,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
new BlockRDD[T](ssc.sc, blockIds)
val validBlockIds = blockIds.filter { id =>
ssc.sparkContext.env.blockManager.master.contains(id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth to add some warning log here? I think the user may forget to enable receiver log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added warning.

@zsxwing
Copy link
Member

zsxwing commented Aug 25, 2015

LGTM except one minor comment

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41507 has finished for PR 8405 at commit 2b6f76e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Aug 25, 2015

LGTM

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41521 has finished for PR 8405 at commit 7590937.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41523 has finished for PR 8405 at commit c62e37d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Aug 25, 2015

Thanks @zsxwing for reviewing. Merging this to master and 1.5

asfgit pushed a commit that referenced this pull request Aug 25, 2015
…ing BlockRDD

When write ahead log is not enabled, a recovered streaming driver still tries to run jobs using pre-failure block ids, and fails as the block do not exists in-memory any more (and cannot be recovered as receiver WAL is not enabled).

This occurs because the driver-side WAL of ReceivedBlockTracker is recovers that past block information, and ReceiveInputDStream creates BlockRDDs even if those blocks do not exist.

The solution in this PR is to filter out block ids that do not exist before creating the BlockRDD. In addition, it adds unit tests to verify other logic in ReceiverInputDStream.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8405 from tdas/SPARK-10210.

(cherry picked from commit 1fc3758)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 1fc3758 Aug 25, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants